import os
import re
import scipy.io.wavfile as wav
import numpy as np
sampling_rate = 8000
# Development dataset
dev_path = './free-spoken-digit/dev'
dev_X = []
dev_y = []
for wav_file in os.listdir(dev_path):
dev_y.append(re.split('[._]', wav_file)[1])
dev_X.append(wav.read(dev_path + '/' + wav_file)[1])
if wav.read(dev_path + '/' + wav_file)[0] != sampling_rate:
print('Sampling error in: ' + dev_path + '/' + wav_file)
# Evaluation dataset
eval_path = './free-spoken-digit/eval'
eval_X = []
for wav_file in os.listdir(eval_path):
eval_X.append(wav.read(eval_path + '/' + wav_file)[1])
if wav.read(eval_path + '/' + wav_file)[0] != sampling_rate:
print('Sampling error in: ' + eval_path + '/' + wav_file)
import matplotlib.pyplot as plt
# Signals: first visualization
fig, ax = plt.subplots(2, 3, figsize=(14,4))
for i in range(2):
for j in range(3):
ax[i][j].plot(dev_X[i*3+j])
plt.tight_layout()
plt.show()
# Signal vectors size
dev_sizes = [el.size for el in dev_X]
eval_sizes = [el.size for el in eval_X]
fig, ax = plt.subplots(1,2,figsize=(13,4))
ax[0].hist(dev_sizes, bins=200)
ax[0].set_title('dev signals length')
ax[1].hist(eval_sizes, bins=200)
ax[1].set_title('eval signals length')
plt.tight_layout()
plt.show()
# Developement dataset
X_d = np.array(dev_X)
y_d = np.array(dev_y)
# Evaluation dataset
X_e = np.array(eval_X)
print(X_d.shape)
print(y_d.shape)
print(X_e.shape)
# Max and min amplitude for each signal
dev_max_amp = [np.max(sig) for sig in X_d]
dev_min_amp = [np.min(sig) for sig in X_d]
eval_max_amp = [np.max(sig) for sig in X_e]
eval_min_amp = [np.min(sig) for sig in X_e]
fig, ax = plt.subplots(2,2,figsize=(13,4))
ax[0][0].hist(dev_max_amp, bins=200)
ax[0][0].set_title('dev max amplitude')
ax[0][1].hist(dev_min_amp, bins=200)
ax[0][1].set_title('dev min amplitude')
ax[1][0].hist(eval_max_amp, bins=200)
ax[1][0].set_title('eval max amplitude')
ax[1][1].hist(eval_min_amp, bins=200)
ax[1][1].set_title('eval min amplitude')
plt.tight_layout()
plt.show()
# Max and min amplitude within a class
sig_class = '7'
dev_max_amp = [np.max(sig) for sig in X_d[y_d == sig_class]]
dev_min_amp = [np.min(sig) for sig in X_d[y_d == sig_class]]
fig, ax = plt.subplots(1,2,figsize=(13,4))
ax[0].hist(dev_max_amp, bins=200)
ax[0].set_title('dev max amplitude')
ax[1].hist(dev_min_amp, bins=200)
ax[1].set_title('dev min amplitude')
plt.tight_layout()
plt.show()
Amplitude isn't constant in the same class
# Amplitude rescaled to 1
for i in range(X_d.size):
X_d[i] = X_d[i] / np.max(np.abs(X_d[i]))
for i in range(X_e.size):
X_e[i] = X_e[i] / np.max(np.abs(X_e[i]))
# Max and min amplitude for each signal
dev_max_amp = [np.max(sig) for sig in X_d]
dev_min_amp = [np.min(sig) for sig in X_d]
eval_max_amp = [np.max(sig) for sig in X_e]
eval_min_amp = [np.min(sig) for sig in X_e]
fig, ax = plt.subplots(2,2,figsize=(13,4))
ax[0][0].hist(dev_max_amp, bins=200)
ax[0][0].set_title('dev max amplitude')
ax[0][1].hist(dev_min_amp, bins=200)
ax[0][1].set_title('dev min amplitude')
ax[1][0].hist(eval_max_amp, bins=200)
ax[1][0].set_title('eval max amplitude')
ax[1][1].hist(eval_min_amp, bins=200)
ax[1][1].set_title('eval min amplitude')
plt.tight_layout()
plt.show()
L'obiettivo è quello di trimmare l'audio all'inizio e fine esatti di quando viene pronunciata la parola.
La lunghezza di una parola è sicuramente significativa sul suo contenuto ed eccetera...
Inoltre si può iterare questa tecnica per isolare delle sotto parti dell'onda (da provare).
def CDF(v):
"""Given a vector, returns the CDF vector of it"""
cdf = np.zeros(v.size)
cdf[0] = v[0]
for i in range(1,v.size):
cdf[i] = cdf[i-1] + v[i]
return cdf / np.sum(v)
def lower_cdf_area(cdf, threshold=.01):
"""Ritorna l'indice della cdf per cui l'area a sx vale 'threshold'"""
area_tot = np.sum(cdf)
area = area_tot * threshold
lower = 0
upper = cdf.size - 1
# Lower area
a = 0
for i in range(cdf.size):
if a >= area/2:
break
a += cdf[i]
return i
def lower_cdf_perc(cdf, threshold=.01):
"""Ritorna l'indice della cdf per cui essa vale 'threshold'"""
i = 0
while cdf[i] < threshold and i < cdf.size:
i += 1
return i
E' molto efficace per ridurre il sumore su un segnale se calcolata con finistra piccola
def moving_average(a, n=10):
"Media mobile"
ret = np.cumsum(a, dtype=float)
ret[n:] = ret[n:] - ret[:-n]
return ret[n - 1:] / n
# Es
fig, ax = plt.subplots(1,2,figsize=(10,4))
ax[0].plot(X_d[0])
ax[0].set_title('Segnale originale')
ax[1].plot(moving_average(X_d[0],10))
ax[1].set_title('Media mobile (finestra = 10)')
plt.tight_layout()
plt.show()
# Es pipeline
fig, ax = plt.subplots(4,1,figsize=(12,15))
# Segnale
sig = X_d[5] #5
# moving avg
mov_avg = moving_average(sig, 10)
ax[0].plot(sig)
ax[0].plot(mov_avg, linewidth=2)
ax[0].set_title('Media mobile')
# abs(mov avg) filtered
mov_avg_abs = np.abs(mov_avg)
mov_avg_abs = mov_avg_abs/np.max(mov_avg_abs)
mov_avg_abs = np.where(mov_avg_abs < .12, 0, mov_avg_abs) #.1 buono
ax[1].plot(mov_avg_abs)
ax[1].set_title('Media mobile filtrata')
# cdf
cdf = CDF(mov_avg_abs)
ax[2].plot(cdf)
ax[2].set_title('CDF')
#upper & lower percentuali
lower_p = lower_cdf_perc(cdf, threshold=.0025)
upper_p = cdf.size - 1 - lower_cdf_perc(np.flip(1-cdf), threshold=.0025)
# segnale trimmato
ax[3].plot(sig)
ax[3].plot(lower_p, 0, '|r', markersize=100)
ax[3].plot(upper_p, 0, '|r', markersize=100)
ax[3].set_title('Trim')
plt.tight_layout()
plt.show()
def upper_lower(sig):
# moving avg
mov_avg = moving_average(sig, 10)
# abs(mov avg) filtered
mov_avg_abs = np.abs(mov_avg)
mov_avg_abs = mov_avg_abs/np.max(mov_avg_abs)
mov_avg_abs = np.where(mov_avg_abs < .12, 0, mov_avg_abs) #.1 buono
# cdf
cdf = CDF(mov_avg_abs)
# upper & lower aree
#lower = lower_cdf_area(cdf, threshold=1e-5)
#upper = cdf.size - 1 - lower_cdf_area(np.flip(1-cdf), threshold=31e-5) #3e-4 buono, 32e-5 troppo
# Si osserva che upper & lower percentuali è più flessibile della versione sulle aree!
# upper & lower percentuali
lower = lower_cdf_perc(cdf, threshold=.0025)
upper = cdf.size - 1 - lower_cdf_perc(np.flip(1-cdf), threshold=.0025)
return upper,lower
n_sig = 50
n_grafici_per_sig = 1
n_plot = n_sig * n_grafici_per_sig
fig, ax = plt.subplots(n_plot,1,figsize=(10,4*n_plot))
for i in range(0,n_plot,n_grafici_per_sig):
sig = X_d[i//n_grafici_per_sig]
upper, lower = upper_lower(sig)
# segnale trimmato
ax[i].plot(sig)
ax[i].plot(lower, 0, '|r', markersize=100)
ax[i].plot(upper, 0, '|r', markersize=100)
ax[i].set_title('Signal: ' + str(i))
plt.tight_layout()
plt.show()
fig, ax = plt.subplots(4,1,figsize=(15,15))
sig = dev_X[49]
sig = sig / np.max(np.abs(sig))
ax[0].plot(sig)
sq = sig**2
sq = sq / np.max(sq)
sq = np.where(sq < .02, 0, sq)
ax[1].plot(sq)
cdf = CDF(sq)
ax[2].plot(cdf)
thre = .0002
lower = lower_cdf_perc(cdf, threshold=.0002)
upper = cdf.size - 1 - lower_cdf_perc(np.flip(1-cdf), threshold=.002)
ax[3].plot(sig)
ax[3].plot(lower, 0, '|r', markersize=100)
ax[3].plot(upper, 0, '|r', markersize=100)
plt.tight_layout()
plt.plot();
for i in range(X_d.size):
upp, low = upper_lower(X_d[i])
X_d[i] = X_d[i][low:upp+1]
for i in range(X_e.size):
upp, low = upper_lower(X_e[i])
X_e[i] = X_e[i][low:upp+1]
plt.plot(X_d[7]);
# Signal vectors size
dev_sizes = [el.size for el in X_d]
eval_sizes = [el.size for el in X_e]
fig, ax = plt.subplots(1,2,figsize=(13,4))
ax[0].hist(dev_sizes, bins=200)
ax[0].set_title('dev signals length')
ax[1].hist(eval_sizes, bins=200)
ax[1].set_title('eval signals length')
plt.tight_layout()
plt.show()
# Com'è possibile che ci siano ancora segnali lunghi più di 4000?
i_lunghi = sorted([(i,X_d[i].size) for i in range(X_d.size) if X_d[i].size > 4000], key=lambda x: -x[1])
i_lunghi = list(map(lambda x:x[0],i_lunghi))
lunghi = len(i_lunghi)
fig, ax = plt.subplots(lunghi,1,figsize=(13,4*lunghi))
for j,i in enumerate(i_lunghi):
u,l = upper_lower(dev_X[i])
ax[j].plot(dev_X[i])
ax[j].plot(l,0,'r|',markersize=100)
ax[j].plot(u,0,'r|',markersize=100)
ax[j].set_title(i)
plt.tight_layout()
plt.plot();
# Conteggio scartati
max_size = 4500
print(f'dev X scartati: {len([el.size for el in X_d if el.size > max_size])/len(X_d)*100:.2f} %')
print(f'eval X scartati: {len([el.size for el in X_e if el.size > max_size])/len(X_e)*100:.2f} %')
# Filtraggio
dev_idx = [i for i in range(X_d.size) if X_d[i].size > max_size]
X_d = np.delete(X_d, dev_idx)
y_d = np.delete(y_d, dev_idx)
X_e = np.delete(X_e, [i for i in range(X_e.size) if X_e[i].size > max_size])
# Signal vectors size
dev_sizes = [el.size for el in X_d]
eval_sizes = [el.size for el in X_e]
fig, ax = plt.subplots(1,2,figsize=(13,4))
ax[0].hist(dev_sizes, bins=200)
ax[0].set_title('dev signals length')
ax[1].hist(eval_sizes, bins=200)
ax[1].set_title('eval signals length')
plt.tight_layout()
plt.show()
fig, ax = plt.subplots(7,1,figsize=(15,27))
sig = dev_X[829] #dev_X[i_lunghi[3]]
sig = sig / np.max(np.abs(sig))
win = sig.size // 10
ax[0].plot(sig)
#mov_avg = moving_average(sig,10)
#mov_avg = mov_avg/np.max(np.abs(mov_avg))
# è il miogliore perchè non attenua troppo le 'gobbe' minori, ma sempre importanti (vedi dev_X[688])
ax[1].plot(sig**2)
ax[2].plot(np.convolve(sig**2,np.ones(win), mode='same'))
#mov_avg = mov_avg**3#moving_average(np.abs(sig),10)
ax[3].plot(sig**3)
ax[4].plot(np.convolve(np.abs(sig**3),np.ones(win), mode='same'))
ax[5].plot(sig**4)
ax[6].plot(np.convolve(sig**4,np.ones(win), mode='same'))
plt.tight_layout()
plt.plot();
def zero_crossing_idx(v):
tol = 1e-8
idx = []
i = 0
while np.abs(v[i]) < tol and i < v.size:
i += 1
prec = v[i]
for j in range(i+1,v.size):
if np.abs(v[j]) < tol:
continue
if v[j] * prec < 0:
idx.append(j)
prec = v[j]
return idx
def der(sig, sampl_rate=8000):
t = 1/sampl_rate
d = np.zeros(sig.size - 1)
for i in range(sig.size - 1):
d[i] = (sig[i+1]-sig[i])/t
return d
def staz(sig):
"""Calcola i punti stazionari del segnale"""
return zero_crossing_idx(der(sig))
def massimi(sig, window=None):
"""Calcola i massimi del segnale"""
if window is None:
window = sig.size // 30
d = der(sig)
st = staz(sig)
m = []
for s in st:
# media della derivata a sx (in una certa finestra)
d_sx = np.average(d[s-window:s])
# media della derivata a dx
d_dx = np.average(d[s:s+window])
if d_sx > 0 and d_dx < 0:
m.append(s)
# prima è massimi più grandi
m = sorted(m,key=lambda x: -sig[x])
# filtraggio massimi troppo vicini
window = 300 #sig.size // 30
m_filt = []
m_filt.append(m[0])
for i in m:
if len([j for j in m_filt if np.abs(i - j) > window]) == len(m_filt):
m_filt.append(i)
return m_filt
dev_X[688]).Dopo aver osservato i risultati ho notato che è meglio fare il segnale alla quarta e non al quadrato
from scipy import fftpack
fig, ax = plt.subplots(4,1,figsize=(15,15))
sig = X_d[0] #dev_X[i_lunghi[3]] #621
sig = sig / np.max(np.abs(sig))
win = sig.size // 10
ax[0].plot(sig)
ax[0].set_title('Segnale originale')
ax[1].plot(sig**2)
ax[1].set_title('Segnale al quadrato')
conv = np.convolve(sig**2,np.ones(win), mode='same')
ax[2].plot(conv)
ax[2].set_title('Convoluzione x porta')
# Trasformata fi fourier
fft = fftpack.fft(conv)
freq = fftpack.fftfreq(fft.size, d=1/8000)
# filtro potenza minima in frequenza
spectrum = np.abs(fft)#fft**2
filt_fft = fft.copy()
#cutoff_idx = spectrum < (spectrum.max()/10)
#cutoff_idx = spectrum < (spectrum.max()/50)
#filt_fft[cutoff_idx] = 0
# filtro passa basso
t = 10
filt_fft[t:filt_fft.size//2] = 0
filt_fft[filt_fft.size//2 +t:] = 0
# Segnale ricostruito
cv = np.real(fftpack.ifft(filt_fft))
cv = (cv - np.min(cv))
cv = cv / np.max(cv)
i_max = massimi(cv)
i_max = [i for i in i_max if cv[i] > .2]
ax[3].plot(cv)
ax[3].plot(i_max,cv[i_max],'o')
ax[3].set_title('Convoluzione ricostruita\nMassimi')
plt.tight_layout()
plt.plot();
def massimi_potenza(sig, window_max=None):
"""Calcola la posizione delle principali 'gobbe' del segnale.
Ritorna le posizioni in base all'importanza del massimo.
"""
# Normalizzazione
sig = sig / np.max(np.abs(sig))
win = sig.size // 10
# Convoluzione del segnale alla quarta
conv = np.convolve(sig**4,np.ones(win), mode='same')
# Trasformata fi fourier
fft = fftpack.fft(conv)
freq = fftpack.fftfreq(fft.size, d=1/8000)
# filtro potenza minima in frequenza
spectrum = np.abs(fft)
filt_fft = fft.copy()
#cutoff_idx = spectrum < (spectrum.max()/10)
#cutoff_idx = spectrum < (spectrum.max()/50)
#filt_fft[cutoff_idx] = 0
# filtro passa basso
t = 10
filt_fft[t:filt_fft.size//2] = 0
filt_fft[filt_fft.size//2 +t:] = 0
# Segnale ricostruito
cv = np.real(fftpack.ifft(filt_fft))
cv = (cv - np.min(cv))
cv = cv / np.max(cv)
i_max = massimi(cv,window=window_max)
i_max = [i for i in i_max if cv[i] > .2]
# Sort per importanza
i_max = sorted(i_max,key=lambda x: -cv[x])
return np.array(i_max)
n_plot = 100
fig, ax = plt.subplots(n_plot,1,figsize=(15,4*n_plot))
for i in range(n_plot):
sig = X_d[i]
mas = massimi_potenza(sig)
ax[i].plot(sig)
ax[i].plot(mas[1:],np.zeros(len(mas)-1),'o',markersize=20)
# Il più alto
ax[i].plot(mas[0],0,'*', markersize=25)
Alcune features che posso estrarre sono:
def zero_crossing_rate(v):
count = 0
i = 0
while v[i] == 0 and i < v.size:
i += 1
prec = v[i]
for j in range(i+1,v.size):
if v[j] == 0:
continue
if v[j] * prec < 0:
count += 1
prec = v[j]
return count
# Example
my_array = np.array([0,0,-4,0,0,5,4,-1]) # -> 2
print(zero_crossing_rate(my_array))
plt.plot(my_array);
from scipy import fftpack
def freq_domain(sig, sampling_rate=8000, cutoff=.2):
"""
Return: bilateral_frequencies, bilateral_amplitude, bilateral_filtered_amplitude
Note: power = np.abs(bilateral_amplitude)
positive_frequencies = bilateral_frequencies[:bilateral_frequencies.size // 2]
"""
time_step = 1/sampling_rate
# The FFT of the signal
sig_fft = fftpack.fft(sig)
# And the power (sig_fft is of complex dtype)
power = np.abs(sig_fft)
# The corresponding frequencies
sample_freq = fftpack.fftfreq(sig.size, d=time_step)
# Removing high frequencies
high_freq_fft = sig_fft.copy()
high_freq_fft[(power / np.max(power)) < cutoff] = 0
return sample_freq, sig_fft, high_freq_fft
Autocorrelation is useful for finding repeating patterns in a signal, such as determining the presence of a periodic signal which has been buried under noise, or identifying the fundamental frequency of a signal which doesn't actually contain that frequency component, but implies it with many harmonic frequencies (Wikipedia 2006).
Hence: Autocorrelation is good to remove noise in the frequency domain
def autocorrelation(x):
autocorr = np.correlate(x, x, mode='full')
autocorr = autocorr[autocorr.size//2:]
autocorr = autocorr / np.max(autocorr)
return autocorr
# Example
x = X_d[2]
autocorr = np.correlate(x, x, mode='full')
autocorr = autocorr[autocorr.size//2:]
autocorr = autocorr / np.max(autocorr)
fig, ax = plt.subplots(1,2,figsize=(14,5))
ax[0].plot(x)
ax[1].plot(autocorr)
plt.show()
import seaborn as sns
fig, ax = plt.subplots(10, 2, figsize=(14, 30))
for cl in range(10):
class_lab = str(cl)
signals = X_d[y_d == class_lab]
n_sig = 300
mean_pot_sig = None
for i, sig in enumerate(signals[:n_sig]):
# Considero l'autocorrelazione
sig = autocorrelation(sig)
freq, freq_amp, filt_freq_amp = freq_domain(sig, sampling_rate)
# Potenza
filt_pot = np.abs(filt_freq_amp)
pot = np.abs(freq_amp)
mean_pot_sig = mean_pot_sig + filt_pot / n_sig if mean_pot_sig is not None else filt_pot / n_sig
#mean_pot_sig = mean_pot_sig + filt_pot / n_sig if mean_pot_sig is not None else filt_pot / n_sig
# Log transformation of signal power
#mean_pot_sig = np.log(np.where(mean_pot_sig == 0, 0.1, mean_pot_sig))
ax[cl][0].plot(freq[:mean_pot_sig.size//2], mean_pot_sig[:mean_pot_sig.size//2])
ax[cl][0].set_title('Classe ' + class_lab + ': potenza media')
sns.heatmap([mean_pot_sig[:2000]], ax=ax[cl][1]);
#sns.heatmap([mean_pot_sig[:mean_pot_sig.size//2]], ax=ax[cl][1]);
ax[cl][1].set_title('Classe ' + class_lab + ': potenza heatmap')
plt.tight_layout()
plt.show()
Spectral centroid: \begin{equation} C = \frac{\sum_i f_i\cdot |A(f_i)|}{\sum_j |A(f_j)|} = \sum_i f_i \cdot \frac{|A(f_i)|}{\sum_j |A(f_j)|} \end{equation} Dove:
Osservazione:
Dunque $\frac{|A(f_i)|}{\sum_j |A(f_j)|}$ si può considerare come la distribuzione di probabilità di una variabile aleatoria discreta $f$ e $C$ è il valore atteso.
Posso quindi calcolare anche la varianza di tale variabile aleatoria con la formula:
\begin{equation} V = E(f^2) - C^2 = E(f^2) - E(f)^2 = \sum_i f_i^2 \cdot \frac{|A(f_i)|}{\sum_j |A(f_j)|} - \left(\sum_i f_i \cdot \frac{|A(f_i)|}{\sum_j |A(f_j)|}\right)^2 \end{equation}def spectral_centroid(f, pot):
return np.sum(f * pot) / np.sum(pot)
def spectral_centroid_var(f, pot, centroid=None):
"""Spectral centroid variance"""
if centroid is None:
centroid = spectral_centroid(f, pot)
return np.sum(f**2 * pot) / np.sum(pot) - centroid
def spectral_info(sig):
"""
Ritorna lo spectral centroid e la deviazione standard,
considerando la potenza come una distribuzione di probabilità
"""
# Pulisco lo spettro del segnale
sig = autocorrelation(sig)
# Fourier transform
fft = fftpack.fft(sig)
freq = fftpack.fftfreq(fft.size, d=1/8000)
# Filtraggio delle frequenze a intensità minore
pot = np.abs(fft)
fft = np.where(pot/np.max(pot) < .1, 0, fft)
# Cut: solo le freq positive
fft = fft[:fft.size//2]
freq = freq[:freq.size//2]
centroid = spectral_centroid(freq, np.abs(fft))
std_dev_spectrum = np.sqrt(spectral_centroid_var(freq, np.abs(fft),centroid))
return centroid, std_dev_spectrum
spectral_info(X_d[0])
Queste due misure sono migliori a identificare la classe quando lo spettro è filtrato o no?
class_lab = '9'
signals = X_d[y_d == class_lab]
n_sig = 5
fig, ax = plt.subplots(2*n_sig, 2, figsize=(13, 40))
mean_pot_sig = None
for i, sig in zip(range(0,2*n_sig,2),signals[:n_sig]):
# Original signal: FFT
freq, f_amp, f_filt_amp = freq_domain(sig, cutoff=.1)
f = freq[:freq.size // 2]
f_a = f_amp[:f_amp.size // 2]
f_filt_a = f_filt_amp[:f_filt_amp.size // 2]
# Spettro completo
ax[i][0].plot(f, np.abs(f_a))
ax[i][0].plot(spectral_centroid(f, np.abs(f_a)), 0, 'o') # Spectral centroid
ax[i][0].set_title(str(i//2+1) + ': Segnale originale')
# Spettro filtrato
ax[i+1][0].plot(f, np.abs(f_filt_a))
ax[i+1][0].plot(spectral_centroid(f, np.abs(f_filt_a)), 0, 'o') # Spectral centroid
ax[i+1][0].set_title(str(i//2+1) + ': Segnale originale (spettro filtrato)')
# Autocorrelation: FFT
freq_a, f_amp_a, f_filt_amp_a = freq_domain(autocorrelation(sig), cutoff=.1)
f_aut = freq_a[:freq_a.size // 2]
f_a_aut = f_amp_a[:f_amp_a.size // 2]
f_filt_a_aut = f_filt_amp_a[:f_filt_amp_a.size //2]
# Spettro completo
ax[i][1].plot(f_aut, np.abs(f_a_aut))
ax[i][1].plot(spectral_centroid(f_aut, np.abs(f_a_aut)), 0, 'o') # Spectral centroid
ax[i][1].set_title(str(i//2+1) + ': Autocorrelazione' \
+ '\nDev st: ' + str(np.sqrt(spectral_centroid_var(f_aut, np.abs(f_a_aut)))))
# Spettro filtrato
ax[i+1][1].plot(f_aut, np.abs(f_filt_a_aut))
ax[i+1][1].plot(spectral_centroid(f_aut, np.abs(f_filt_a_aut)), 0, 'o') # Spectral centroid
ax[i+1][1].set_title(str(i//2+1) + ': Autocorrelazione (spettro filtrato)' \
+ '\nDev st: ' + str(np.sqrt(spectral_centroid_var(f_aut, np.abs(f_filt_a_aut)))))
plt.tight_layout()
plt.show()
Si osserva come il comportamento in frequenza dell'autocorrelazione sia molto meno rumoroso rispetto a quello del segnale originale.
Il centroide in frequenza individua correttamente le frequenze più importanti se:
cutoff = 0.1.Next step: calcolare il centroide in frequenza per ciascun segnale e plottarne la distribuzione di ogni classe. Valutare la media e la varianza per ciascuna classe. Questa procedura è una semplificazione dello spettro in fequenza.
Riguardano la distribuzione delle ampiezze di un segnale.
Posso prenderne 10 per ogni segnale ad esempio: range(5,96,10).
def time_percentiles(sig,p):
perc = []
for i in p:
perc.append(np.percentile(sig,i))
return np.array(perc)
def freq_percentiles(sig,p):
fft = fftpack.fft(sig)
fft = np.abs(fft[:fft.size//2])
perc = []
for i in p:
perc.append(np.percentile(fft,i))
return np.array(perc)
# Andamento dei percentili dei primi n segnali
fig, ax = plt.subplots()
for j in range(10):
sig = X_d[j] # np.abs(X_d[j])
perc = []
for i in range(100):
perc.append(np.percentile(sig,i))
ax.plot(perc)
# Andamento dei percentili della potenza dei primi n segnali
fig, ax = plt.subplots()
for j in range(10):
sig = X_d[j]
fft = fftpack.fft(sig)
fft = np.abs(fft[:fft.size//2])
perc = []
for i in range(100):
perc.append(np.percentile(fft,i))
ax.plot(perc)
def massimi_potenza_freq(sig, window_max=None):
"""Calcola la posizione delle principali 'gobbe' del segnale"""
# Normalizzazione
sig = sig / np.max(np.abs(sig))
win = sig.size // 50
# Convoluzione del segnale alla quarta
conv = np.convolve(sig,np.ones(win), mode='same')
# Trasformata fi fourier
fft = fftpack.fft(conv)
freq = fftpack.fftfreq(fft.size, d=1/8000)
# filtro potenza minima in frequenza
spectrum = np.abs(fft)
filt_fft = fft.copy()
#cutoff_idx = spectrum < (spectrum.max()/10)
#cutoff_idx = spectrum < (spectrum.max()/50)
#filt_fft[cutoff_idx] = 0
# filtro passa basso
t = 10
filt_fft[t:filt_fft.size//2] = 0
filt_fft[filt_fft.size//2 +t:] = 0
# Segnale ricostruito
cv = np.real(fftpack.ifft(filt_fft))
cv = (cv - np.min(cv))
cv = cv / np.max(cv)
i_max = massimi(cv,window=window_max)
i_max = [i for i in i_max if cv[i] > .2]
return np.array(i_max)
n_plot = 5
fig, ax = plt.subplots(n_plot,1,figsize=(15,4*n_plot))
for i in range(n_plot):
sig = autocorrelation(X_d[i])
fft = fftpack.fft(sig)
freq = fftpack.fftfreq(fft.size, d=1/8000)
sig = np.abs(fft[:fft.size//2])
mas = massimi_potenza_freq(sig,30)
ax[i].plot(freq[:freq.size//2],sig)
l = freq[freq.size//2] / sig.size
ax[i].plot(l*mas,np.zeros(len(mas)),'o',markersize=10)
ax[i].plot(spectral_centroid(freq[:freq.size//2],np.where(sig/np.max(sig) < .1, 0, sig)),0,'x',markersize=10)
It is a measure of the shape of the signal. It represents the frequency below which a specified percentage of the total spectral energy, e.g. 85%, lies.
In pratica è una versione della banda al x% di energia.
def spectral_rolloff(sig, threshold=.9):
"""Return the spectral rolloff"""
if threshold > 1:
threshold = 1
fft = fftpack.fft(sig)
freq = fftpack.fftfreq(fft.size, d=1/8000)
fft = fft[:fft.size//2]
freq = freq[:freq.size//2]
pot = np.abs(fft)**2
pot = pot / np.sum(pot)
s = 0
i = 0
while s < threshold:
s += pot[i]
i += 1
return freq[i]
n_plot = 10
fig, ax = plt.subplots(n_plot,1,figsize=(15,4*n_plot))
for i in range(n_plot):
sig = autocorrelation(X_d[i])
fft = fftpack.fft(sig)
freq = fftpack.fftfreq(fft.size, d=1/8000)
fft = np.abs(fft[:fft.size//2])
ax[i].plot(freq[:freq.size//2],fft)
ax[i].plot(spectral_centroid(freq[:freq.size//2],np.where(fft/np.max(fft) < .1, 0, fft)),0,'x',markersize=10)
ax[i].plot(spectral_rolloff(sig),0,'|',markersize=10)
Il tempo che il segnale impiega a passare dal 10% al 90% della sua energia. E' interessante perchè i segnali più impulivi avranno un tempo di salita piccolo e viceversa
def rise_time(sig, T=1/8000):
sig = sig**2
sig = sig / np.sum(sig)
lower = 0
upper = sig.size-1
s = 0
for i in range(sig.size):
lower = i if s < .1 else lower
upper = i if 1-s > .1 else upper
s += sig[i]
return (upper-lower)*T #upper,lower
n_plot = 10
fig, ax = plt.subplots(n_plot,1,figsize=(15,4*n_plot))
for i in range(n_plot):
sig = X_d[i]
u,l = rise_time(sig)
ax[i].plot(sig)
ax[i].plot(l,0,'|',markersize=100)
ax[i].plot(u,0,'|',markersize=100)
Allo stesso modo posso calcolare il punto in cui l'energia raggiunge il 50%. Il tempo che un segnale impiega a raggiungere il 50% di energia è una casatteristica assoluta nel segnale (tempo dall'inizio). Il rise_time() invece è una carattereistica indipendente dalla posizione nel segnale.
def centro_energia(sig):
sig = sig**2
sig = sig / np.sum(sig)
s = 0
i = 0
while s < .5 and i < sig.size:
s += sig[i]
i += 1
return i
n_plot = 10
fig, ax = plt.subplots(n_plot,1,figsize=(15,4*n_plot))
for i in range(n_plot):
sig = X_d[i]
u = centro_energia(sig)
ax[i].plot(sig)
ax[i].plot(u,0,'|',markersize=100)
Estensione del concetto di centro ti energia: il centro è il 50% dell'energia
def perc_energia(sig, p):
"""Ritorna l'indice del segnale al quale la percentuale di energia vale 'p', in (0,1)"""
sig = sig**2
sig = sig / np.sum(sig)
s = 0
i = 0
while s < p and i < sig.size:
s += sig[i]
i += 1
return i
def min_dist(sig):
if sig.size < 2:
return 0
sig = np.sort(sig)
return min([np.abs(sig[i]-sig[i-1]) for i in range(1,sig.size)])
Una tecnica molto usata è quella di 'spezzare' il segnale e analizzare ciascun pezzo. Farò una prova calcolando la percentuale di energia per ogni frame.
def power(sig):
"""Potenza del segnale"""
return np.abs(sig)**2
def ener_dist(sig, slots=10):
"""Distribuzione di energia"""
power = np.abs(sig)**2
dist = np.zeros(slots)
slot_length = sig.size//(slots-1)
for i in range(0,sig.size,slot_length):
s = np.sum(power[i:i+slot_length])
dist[i//slot_length] = s
# Avanzi
dist[-1] += np.sum(power[-(sig.size % slots)])
return dist / np.sum(power)
fig, ax = plt.subplots(figsize=(14,4))
ax.plot(ener_dist(X_d[1]));
import scipy.stats as stats
def extract_features(sig):
f = dict()
t = 1/8000
# TIME
# Signal length
f['length'] = sig.size * t
# Amplitude: mean, stddev, min, max
f['avg_amp'] = np.average(sig)
f['stddev_amp'] = np.std(sig)
f['max_amp'] = np.max(sig)
f['min_amp'] = np.min(sig)
# abs(Amplitude): mean, stddev, min, max
f['avg_abs_amp'] = np.average(np.abs(sig))
f['stddev_abs_amp'] = np.std(np.abs(sig))
f['max_abs_amp'] = np.max(np.abs(sig))
f['min_abs_amp'] = np.min(np.abs(sig))
# Signal kurtosis
f['sig_kurtosis'] = stats.kurtosis(sig)
# Signal skewness
f['sig_skew'] = stats.skew(sig)
# Linear regression
slope, intercept, r_value, p_value, std_err = stats.linregress(t*np.array(range(sig.size)),sig)
f['sig_linreg_slope'] = slope
f['sig_linreg_intercept'] = intercept
f['sig_linreg_r_value'] = r_value
f['sig_linreg_p_value'] = p_value
f['sig_linreg_std_err'] = std_err
# 'Gobbe'
massimi = massimi_potenza(sig) # indici di sig corrispondenti ai massimi
f['n_gob'] = massimi.size
f['min_dist_gob'] = min_dist(massimi) * t
f['best_gob'] = massimi[0] * t # è la gobba più alta
# Zero crossing rate
f['zero_crossing_rate'] = zero_crossing_rate(sig)
# Rise time
f['rise_time'] = rise_time(sig)
# Energy center
f['ener_center'] = centro_energia(sig) * t
# Percentuale di energia
p = range(5,96,10) # percentuali
for i in p:
f['ener_' + str(i) + '_perc'] = perc_energia(sig,i/100) * t
#Energy distribution (%): riassume la forma del segnale
slots = 10
ener_d = ener_dist(sig, slots)
for i,en in enumerate(ener_d):
f['ener_dist_' + str(i+1)] = en
# Numero di slot con % energia superiore alla media
f['top_slots'] = len([i for i in ener_d if i > np.average(ener_d)]) / slots
# FREQUENCY
# FFT
fft = fftpack.fft(sig)
freq = fftpack.fftfreq(fft.size, d=t)
power = np.abs(fft[:fft.size//2])
# Power base statistics
f['f_avg_power'] = np.average(power)
f['f_stddev_power'] = np.std(power)
f['f_min_power'] = np.min(power)
f['f_max_power'] = np.max(power)
# Power kurtosis
f['f_power_kurtosis'] = stats.kurtosis(power)
# Signal skewness
f['f_power_skew'] = stats.skew(power)
# Linear regression
slope, intercept, r_value, p_value, std_err = stats.linregress(freq[:freq.size//2],power)
f['f_power_linreg_slope'] = slope
f['f_power_linreg_intercept'] = intercept
f['f_power_linreg_r_value'] = r_value
f['f_power_linreg_p_value'] = p_value
f['f_power_linreg_std_err'] = std_err
# Spectral centroid & Spectrum stddev
sp_centroid, sp_std = spectral_info(sig)
f['spec_centroid'] = sp_centroid
f['stddev_spec'] = sp_std
# Spectral rolloff (freq. sotto la quale ho il 90% di energia)
f['spec_rolloff'] = spectral_rolloff(sig)
# Spectral rolloff a tante frequenze
p = range(5,96,10) # percentuali
for i in p:
f['spec_rolloff_' + str(i) + '_perc'] = spectral_rolloff(sig,i/100)
# PERCENTILES
p = range(5,96,10)
# Time
# Percentili del segnale
for i in p:
f['time_' + str(i) + '_perc'] = np.percentile(sig,i)
# Percentili della derivata del segnale
for i in p:
f['time_der_' + str(i) + '_perc'] = np.percentile(der(sig),i)
# Frequency (no filtraggio o autocorr)
for i in p:
f['freq_' + str(i) + '_perc'] = np.percentile(power,i)
# Frequency: percentili della derivata della potenza
for i in p:
f['freq_der_' + str(i) + '_perc'] = np.percentile(der(power),i)
return f
extract_features(X_d[0])
import pandas as pd
X_df = pd.DataFrame([extract_features(X_d[i]) for i in range(X_d.size)])
X_df
X_df.describe()
X_df.info()
import seaborn as sns; sns.set()
def plot_heatmap(X_df):
#compute the correlation matrix
correlation_matrix = X_df.corr()
feature_names = X_df.columns
# Generate a mask for the upper triangle
mask = np.zeros_like(correlation_matrix, dtype=np.bool)
mask[np.triu_indices_from(mask)] = True
# Set up the matplotlib figure
f, ax = plt.subplots(figsize=(15, 15))
# Generate a custom diverging colormap
cmap = sns.diverging_palette(220, 10, as_cmap=True)
# Draw the heatmap with the mask and correct aspect ratio
ax = sns.heatmap(correlation_matrix, mask=mask, cmap=cmap, vmax=1, vmin = -1, center=0,
square=True, linewidths=.5);
#build the heatmap
#ax = sns.heatmap(
# correlation_matrix,
# vmin=-1, vmax=1, center=0,
# cmap=sns.diverging_palette(20, 220, n=200),
# square=True
#)
ax.set_xticklabels(
feature_names,
rotation=45,
horizontalalignment='right'
);
plot_heatmap(X_df)
print('All the strongly correlated features:\n')
threshold = .85
correlation_matrix = X_df.corr()
feature_names = X_df.columns
# Lista di features correlate tra loro
correlated = []
for i in range(len(correlation_matrix)):
for j in range(0,i):
if correlation_matrix.iloc[i,j] > threshold or correlation_matrix.iloc[i,j] < -threshold:
print(f'({feature_names[i]}, {feature_names[j]}) = {correlation_matrix.iloc[i,j]:.3f}')
# Connessione
correlated.append([feature_names[i], feature_names[j]])
import numpy as np
def componenti_connesse(connessioni):
"""
Restituisce le componenti connesse a partire da una lista di connessioni
connessioni:lista di liste
"""
elem = []
for c in connessioni:
elem.append(c[0])
elem.append(c[1])
# Vettore di elementi: possono essere qualsiasi oggetto
elem = list(set(elem))
# Vettore delle connessioni
v = np.array([i for i in range(len(elem))])
for c in connessioni:
# Hashing
i = elem.index(c[0])
j = elem.index(c[1])
# Verifica connessione
if v[i] != v[j]:
# Creazione connessione
b = v[j]
a = v[i]
for k in range(v.size):
if v[k] == a:
v[k] = b
# Estrazione componenti connesse
cc = list(set(v))
elem = np.array(elem)
cc_el = []
for c in cc:
cc_el.append(elem[v == c].tolist())
return cc_el
# Componenti connesse della correlazione tra features
# Non sto differenziando correlazione positiva e negativa. lo devo fare?
comp_conn = componenti_connesse(correlated)
comp_conn
def rapp_feature(comp_conn, correlation_matrix):
corr = correlation_matrix.copy()
# Rimuovo la correlazione di un indice con sè stesso
for c in corr.columns:
corr.loc[c,c] = 0
# Faccio il modulo della correlazione: positiva e negativa non si devono cancellare a vicenda
corr = np.abs(corr)
# Correlazione totale di ciscuna feature con le altre
somma_corr = corr.sum()
rappr_features = []
# Per ciasscuna comp_conn
for cc in comp_conn:
# Lo score viene calcolato come: correlazione_interna(comp conn)/correlazione_totale
f, f_score = max([(feat, f_corr/somma_corr[feat]) for feat,f_corr in corr.loc[cc,cc].sum().items()],\
key=lambda x: x[1])
rappr_features.append(f)
return rappr_features
def riduzione_correlate(X,threshold=.85):
"""Riceve un DataFrame X e ritorna X ridotto"""
correlation_matrix = X.corr()
feature_names = X.columns
# Lista di features correlate tra loro
correlated = []
for i in range(len(correlation_matrix)):
for j in range(i):
if correlation_matrix.iloc[i,j] > threshold or correlation_matrix.iloc[i,j] < -threshold:
# Connessione
correlated.append([feature_names[i], feature_names[j]])
# Componenti connesse della correlazione tra features
# Non sto differenziando correlazione positiva e negativa. lo devo fare? Non credo
comp_conn = componenti_connesse(correlated)
# Feature rappresentante per ogni componente connessa
best_feat = rapp_feature(comp_conn, correlation_matrix)
# Rimozione delle features: tutte quelle delle componenti connesse meno il rappresentante
bad_feat = []
for i,cc in enumerate(comp_conn):
bad_feat.extend(list(filter(lambda x: x != best_feat[i], cc)))
# Drop delle colonne
X_new = X.drop(columns=bad_feat)
return X_new
X_old = X_df.copy()
X_df = riduzione_correlate(X_old)
plot_heatmap(X_df)
Alcune features possono essere fortemente correlate tra loro, il chè può avere effetti negativi sulle performanecs (o comunque non le migliora).
Riduzione - pipeline:
Per ogni componente connessa estraggo un rappresentante. Esso è la feature maggiormente correlata all'interno della componente connessa e meno correlata con le altre features (esterne). Per fare questo prima rimuovo la diagonale della matrice di correlazione (che è tutta a 1 ed è inutile), poi faccio il modulo della stessa. Non voglio che sommando le correlazioni quelle positive si cancellino on quelle negative.
Infine, lo score della feature è così calcolato:
\begin{equation} \lambda = \frac{\sum_{j \in CC} corr_{j}}{\sum_{i=1}^{N_{feat}} corr_{i}} \end{equation}
Ossia, per ogni feature di una componente connessa, è il rapporto tra la somma delle correlazioni che essa ha con le features della stessa componente connessa e la somma delle correlazioni che ha con tutte le features.
X = X_df.values
X.shape
from sklearn.preprocessing import KBinsDiscretizer
n_ft = X.shape[1]
est = KBinsDiscretizer(n_bins=[2 for i in range(n_ft)], encode='ordinal').fit(X)
# X = est.transform(X)
Fortemente condigliata per le SVM
from sklearn import preprocessing
# Min max normalization
min_max_scaler = preprocessing.MinMaxScaler()
X_mm = min_max_scaler.fit_transform(X)
# z-score normalization
z_scaler = preprocessing.StandardScaler()
X_z = z_scaler.fit_transform(X)
# X_z Sembra dare performances migliori (soprattutto x le SVM e log.regress):
X = X_z
Sia la discretization che la normalization hanno pompato le performances di tutti i classificatori (tranne le rnd forest, che sono rimaste circa uguali). Il fatto è che usando discretization + normalization si ottengono più o meno gli stessi miglioramenti ottenuti usando anche solo una delle due. Conviene comunque provare a metterle in cascata.
Il motivo di questo è che molti algoritmi di classificazione (SVM, KNN, Logistic regression...) non sono scale independent e necessitano che tutte le features vengano rappresentate con la stessa scala. Questo risultato si ottiene normalizzando, o discretizzando (con perdita di informazione).
Le random forest invece sono scale independent (?) perchè hanno dato gli stessi risultati indipendentemente dalla normalizzazione.
"Support Vector Machine algorithms are not scale invariant, so it is highly recommended to scale your data. For example, scale each attribute on the input vector X to [0,1] or [-1,+1], or standardize it to have mean 0 and variance 1." [Fonte: https://scikit-learn.org/stable/modules/svm.html]
import sklearn
from sklearn.model_selection import train_test_split
x_train, x_test, y_train, y_test = train_test_split(X, y_d, test_size=.2)
from sklearn.ensemble import RandomForestClassifier
# Hanno beneficiato (poco poco) della data discretization
rf = RandomForestClassifier(n_estimators=1000)
rf.fit(x_train,y_train)
# Performance assessment
from sklearn.metrics import classification_report
y_p = rf.predict(x_test)
print(classification_report(y_test, y_p))
rf.feature_importances_
sorted([(f, imp) for f,imp in zip(feature_names, rf.feature_importances_)], key=lambda x: -x[1])
from sklearn.model_selection import GridSearchCV
params = {
'criterion': ['gini', 'entropy'],
'max_depth': [None, 20, 40],
#'max_leaf_nodes': [None, 10, 15, 20],
#'min_impurity_decrease': [0.0, 0.1, 0.01]
'max_features': ['sqrt', 'log2', None]
}
clf = RandomForestClassifier(n_estimators=100)
# Adesso la divisione tra training set e validation set è ciclica e
# la rotazione e la valutazione della miglior configurazione viene
# gestita dalla classe GridSearchCV.
gridsearch = GridSearchCV(clf, params, scoring='accuracy', cv=5, iid=False)
# Il fit viene sempre fatto su {validation + training} set!!
gridsearch.fit(x_train,y_train)
# Best accuracy sul validation set
#print(gridsearch.best_score_)
# Test delle performances
y_p = gridsearch.best_estimator_.predict(x_test)
#acc = accuracy_score(y_test, y_p)
print(classification_report(y_test, y_p))
best_params = gridsearch.best_params_
best_params
rf = RandomForestClassifier(n_estimators=1000, **best_params)
rf.fit(x_train,y_train)
y_p = rf.predict(x_test)
print(classification_report(y_test, y_p))
from sklearn.svm import SVC
# Hanno bisogno che il dataset sia z-normalizzato!! e funzionano meglio se discretizzato
svml = SVC(C=1, gamma='scale')
svml.fit(x_train, y_train)
y_p = svml.predict(x_test)
print(classification_report(y_test, y_p))
Naive Bayes assumption:
\begin{equation} p(f_1,..., f_n|c) = \prod_{i=1}^n p(f_i|c) \end{equation}Dove $f_i$ è la feature i-esima e $c$ è una classe.
$p(f_i|c)$ è la distribuzione di probabilità della feature i nella classe c.
In questo caso un Gaussian Naive Bayes o un CategoricalNB dovrebbe andare.
from sklearn.naive_bayes import GaussianNB #, CategoricalNB
gaussnb = GaussianNB()
gaussnb.fit(x_train, y_train)
y_p = gaussnb.predict(x_test)
print(classification_report(y_test, y_p))
#catenb = CategoricalNB()
#catenb.fit(x_train, y_train)
#y_p = catenb.predict(x_test)
#print(classification_report(y_test, y_p))
from sklearn.neighbors import KNeighborsClassifier
# FUnzionano meglio se il dataset è normalizzato!!
knn = KNeighborsClassifier()
knn.fit(x_train, y_train)
y_p = knn.predict(x_test)
print(classification_report(y_test, y_p))
from sklearn.linear_model import LogisticRegression
#Funziona meglio se il dataset è normalizzato (meglio z-score)!
lrclass = LogisticRegression(solver='lbfgs', multi_class='ovr', max_iter=100000)
lrclass.fit(x_train, y_train)
y_p = lrclass.predict(x_test)
print(classification_report(y_test, y_p))